package com.atguigu.flink.watermark;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Created by Smexy on 2023/11/15
 */
public class Demo8_WindowJoin
{
    public static void main(String[] args) {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);



        env.setParallelism(1);

         WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                     .<WaterSensor>forMonotonousTimestamps()
                     .withTimestampAssigner( (e, ts) -> e.getTs());
         
         //准备两个流
        SingleOutputStreamOperator<WaterSensor> ds1 = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy);

        SingleOutputStreamOperator<WaterSensor> ds2 = env
            .socketTextStream("hadoop102", 8889)
            .map(new WaterSensorMapFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy);
        
        /*
            先join，再开窗

            t1 join t2 on t1.id = t2.id
         */
        ds1
            //ds1 join ds2
            .join(ds2)
            //on ds1.xx
            .where(WaterSensor::getId)
            // = ds2.xx
            .equalTo(WaterSensor::getId)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            // apply 等价于 process
            .apply(new JoinFunction<WaterSensor, WaterSensor, String>()
            {
                // first 和 second 必须位于同一窗口，且 first.id = second.id，调用一次 Join方法
                @Override
                public String join(WaterSensor first, WaterSensor second) throws Exception {
                    return first + "=====" + second;
                }
            })
            .print();



        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
